/*
 * Copyright © OpenAtom Foundation.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *      http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package io.iec.edp.caf.commons.core;

//import io.iec.edp.caf.commons.core.api.DataDeserializer;
import io.iec.edp.caf.commons.core.api.CafSerialization;
import io.iec.edp.caf.commons.core.api.DataSerializer;
import io.iec.edp.caf.commons.core.entity.TypeConstants;
import io.iec.edp.caf.commons.core.enums.SerializeType;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import lombok.SneakyThrows;
import lombok.var;
import org.springframework.core.env.Environment;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 序列化器工厂
 * 用来创建序列化器和反序列化器
 */
public class SerializerFactory {

    static {
        ServiceLoader<CafSerialization> operations = ServiceLoader.load(CafSerialization.class);

        Iterator<CafSerialization> iterator = operations.iterator();
        while (iterator.hasNext()) {
            CafSerialization operation = iterator.next();
            SerializerFactory.registerSerialization(operation.getId(),operation.getClass());
        }
    }

    /**
     * 序列化器缓存
     */
    private static Map<Byte, Class> serializationMap;

    /**
     * 根据序列化类型获取对应的序列化器
     * @param type 序列化类型（Json,Protobuf...）
     * @return 对应序列化类型的序列化器
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getSerializer(SerializeType type) {
        Object obj = getSerializationObj(type);
        return ((CafSerialization)obj).getSerializer();
    }

    /**
     * 根据环境变量serialize.type类型获取序列化器
     * @return 对应序列化类型的序列化器
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getSerializer(){
        Environment environment = SpringBeanUtils.getBean(Environment.class);
        String type = environment.getProperty("serialize.type");
        var serialType = getSerializetionByType(type);
        return getSerializer(serialType);
    }

    /**
     * 根据序列化类型获取对应的序列化器
     * @param type 序列化类型（Json,Protobuf...）
     * @param stream 序列化值的输出流
     * @return
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getSerializer(SerializeType type, OutputStream stream) {
        Object obj = getSerializationObj(type);

        return ((CafSerialization)obj).getSerializer(stream);
    }

    /**
     * 根据环境变量serialize.type类型获取序列化器
     * @param stream 序列化值的输出流
     * @return
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getSerializer(OutputStream stream) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Environment environment = SpringBeanUtils.getBean(Environment.class);
        String type = environment.getProperty("serialize.type");
        var serialType = getSerializetionByType(type);
        return getSerializer(serialType,stream);
    }

    /**
     * 根据序列化类型获取对应的反序列化器
     * @param type 序列化类型（Json,Protobuf...）
     * @return 对应序列化类型的反序列化器
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getDeserializer(SerializeType type) {
        Object obj = getSerializationObj(type);
        return ((CafSerialization)obj).getDerializer();
    }

    /**
     * 根据环境变量serialize.type类型获取序反列化器
     * @return 对应序列化类型的反序列化器
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getDeserializer() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Environment environment = SpringBeanUtils.getBean(Environment.class);
        String type = environment.getProperty("serialize.type");
        var serialType = getSerializetionByType(type);
        return getDeserializer(serialType);
    }

    /**
     * 根据序列化类型获取对应的反序列化器
     * @param type 序列化类型（Json,Protobuf...）
     * @param stream 反序列化值的输入流
     * @return
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getDeserializer(SerializeType type, InputStream stream)  {
        Object obj = getSerializationObj(type);
        return ((CafSerialization)obj).getDerializer(stream);
    }

    /**
     * 根据环境变量serialize.type类型获取序列化器
     * @param stream 反序列化值的输入流
     * @return
     * @throws ClassNotFoundException
     * @throws IllegalAccessException
     * @throws InstantiationException
     */
    public static DataSerializer getDeserializer(InputStream stream)  {
        Environment environment = SpringBeanUtils.getBean(Environment.class);
        String type = environment.getProperty("serialize.type");
        var serialType = getSerializetionByType(type);

        return getDeserializer(serialType,stream);
    }

    /**
     * Serialization注册
     * @param typeKey Serialization类型
     * @param serialization Serialization
     */
    protected static void registerSerialization(Byte typeKey, Class serialization){
        synchronized (CafSerialization.class){
            serializationMap = serializationMap==null?new ConcurrentHashMap<>():serializationMap;
            if(serializationMap.containsKey(typeKey)==false){
                serializationMap.put(typeKey,serialization);
            }else{
                serializationMap.remove(typeKey);
                serializationMap.put(typeKey,serialization);
            }
        }
    }

    private static SerializeType getSerializetionByType(String type){
        var rettype = SerializeType.Json;
        switch (type){
            case "Json":
                rettype = SerializeType.Json;
                break;
            case "Protobuf":
                rettype = SerializeType.Protobuf;
                break;
            default:
                rettype = SerializeType.Json;
        }

        return rettype;
    }

    @SneakyThrows
    private static Object getSerializationObj(SerializeType type){
        Object obj = null;

        switch (type){
            case Json:
                var jclazz = serializationMap.get(TypeConstants.JSON_SERIALIZATION_ID);
                obj = jclazz.newInstance();
                break;
            case Protobuf:
                var pclazz = serializationMap.get(TypeConstants.PROTOBUF_SERIALIZATION_ID);
                obj = pclazz.newInstance();
                break;
            default:
                var dclazz = serializationMap.get(TypeConstants.JSON_SERIALIZATION_ID);
                obj = dclazz.newInstance();
                break;
        }

        return obj;
    }
}
